RabbitMQ 是除了 Qpid 之外, 唯一实现了AMQP(高级消息协议:Advanced Message Queue Protocol)标准的代理服务器。
虽然AMQP像邮箱那样为离线消费者存储消息,但是这些根据标签路由的消息更为灵活。同时和邮件不同的是,这些消息没有固定的结构,甚至于可以直接存储二进制数据。不同于IM协议,AMQP隐去了消息的发送方和接受方。AMQP也没有“存在”这个概念。
对负载均衡来说,队列是绝佳方案.
目录
生产者创建消息(消息包含两部分: 有效载荷[payload] 和 标签[label]),然后发布(Publish) 到代理服务器(RabbitMQ)
消费者连接到代理服务器,并订阅到队列上。当消费者接受到消息时,它只得到了消息的一部分:有效载荷。在消息路由过程中,消息的标签并没有随有效载荷一同传递。
如果需要明确知道谁生产的AMQP消息的话,就要看生产者是否把消息方信息放入有效载荷中。
AMQP栈
AMQP消息路由必须有三个部分: 交换器、队列、绑定。
生产者把消息发布到交换器上;消息最终到达队列,并被消费者解释;绑定决定了消息如何从路由器 路由到特定队列。
消费者和生产者到底谁去创建队列?
如果你不能承担得起消息进入“黑洞”而丢失的话,你的生产者和消费者就都应该尝试去创建队列。
生产者创建消息
import "github.com/streadway/amqp"
// step 1. 创建 connection
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
// step 2. 获取 (信道)channel
channel, err := conn.Channel()
// step 3. 在信道上声明交换器 exchange
channel.ExchangeDeclare(
"exchange_name", // exchange name
amqp.ExchangeDirect, // exchange type
false, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args amqp.Table
)
// step 4. 声明队列
queue, err := ch.QueueDeclare(
"queue_name", // queue name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments amqp.Table
)
// step 5. 将队列绑定根据路由键绑定到交换器上
channel.QueueBind(
queue.Name, // queue name
"route_key", // route key
"exchange_name", // exchange name
false, // no-wait
nil, // arguments amqp.Table
)
// step 6. 将消息发送到交换器上,交换器会根据路由键将消息发送到对应的队列queue
channel.Publish(
"exchange_name", // exchange
"route_key", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{ // message
ContentType: "text/plain",
Body: []byte("hello world"),
})
消费者订阅消息
import "github.com/streadway/amqp"
// step 1. 创建 connection
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
// step 2. 获取 (信道)channel
channel, err := conn.Channel()
// step 3. 在信道上声明交换器 exchange
channel.ExchangeDeclare(
"exchange_name", // exchange name
amqp.ExchangeDirect, // exchange type
false, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args amqp.Table
)
// step 4. 声明队列
queue, err := ch.QueueDeclare(
"queue_name", // queue name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments amqp.Table
)
// step 5. 将队列绑定根据路由键绑定到交换器上
channel.QueueBind(
queue.Name, // queue name
"route_key", // route key
"exchange_name", // exchange name
false, // no-wait
nil, // arguments amqp.Table
)
forever := make(chan bool)
// step 6. 在信道上订阅队列
messages, err := channel.Consume(
queue.Name, // queue
"", // consumer
false, // autoAck
false, // exclusive
false, // noLocal :The noLocal flag is not supported by RabbitMQ.
false, // no-wait
nil, // arguments amqp.Table
)
// 开启一个 goruntine 获取消息内容
go func() {
for message := range messages {
log.Printf("Received a message: %s", message.Body)
message.Ack(true)
}
}()
fmt.Println("Please ctrl+c to stop")
<-forever